package com.cantor.consumer.handler;

import com.cantor.consumer.netty.RetryPolicy;
import com.cantor.consumer.start.ConsumerNettyKeeper;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.EventLoop;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.TimeUnit;

/**
 * 重连处理器
 * 当Channel的channelInactive事件被触发, 就通知ConsumerNettyKeeper.addConnection()来重新往map中添加Channel.
 */
@Slf4j
public class ReconnectHandler extends ChannelInboundHandlerAdapter {

    private String host;

    private Integer port;

    private int retries = 0;

    private RetryPolicy retryPolicy;

    public ReconnectHandler(String host, Integer port, RetryPolicy retryPolicy) {
        this.host = host;
        this.port = port;
        this.retryPolicy = retryPolicy;
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info("Successfully established a connection to the server.");
        retries = 0;
        ctx.fireChannelActive();
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        if (retries == 0) {
            log.error("Lost the TCP connection with the server.");
            ctx.close();
        }

        boolean allowRetry = retryPolicy.allowRetry(retries);
        if (allowRetry) {

            long sleepTimeMs = retryPolicy.getSleepTimeMs(retries);

            log.info("Try to reconnect to the server after {}ms. Retry count: {}.", sleepTimeMs, ++retries);

            final EventLoop eventLoop = ctx.channel().eventLoop();
            eventLoop.schedule(() -> {
                log.debug("Reconnecting to {}:{} ...",host,port);
                ConsumerNettyKeeper.addConnection(host,port); // 关键代码
            }, sleepTimeMs, TimeUnit.MILLISECONDS);
        }
        ctx.fireChannelInactive();
    }

}
